Java UDF

本文为您介绍如何编写和使用UDF。

背景信息

自2.2.0版本起,StarRocks支持使用Java语言编写用户定义函数(User Defined Function,简称UDF)。

自3.0版本起,StarRocks支持Global UDF,您只需要在相关的SQL语句(CREATE/SHOW/DROP)中加上GLOBAL关键字,该语句即可全局生效,无需逐个为每个数据库执行此语句。您可以根据业务场景开发自定义函数,扩展StarRocks的函数能力。

目前StarRocks支持的UDF包括:

  • 用户自定义标量函数(Scalar UDF)

  • 用户自定义聚合函数(User Defined Aggregation Function,UDAF)

  • 用户自定义窗口函数(User Defined Window Function,UDWF)

  • 用户自定义表格函数(User Defined Table Function,UDTF)

前提条件

使用StarRocks的Java UDF功能前,您需要:

  • 安装Apache Maven以创建并编写相关Java项目。

  • 在服务器上安装JDK 1.8。

  • 开启UDF功能。在实例配置页面,设置FE配置项enable_udfTRUE,并重启实例使配置项生效。

类型映射关系

SQL TYPE

Java TYPE

BOOLEAN

java.lang.Boolean

TINYINT

java.lang.Byte

SMALLINT

java.lang.Short

INT

java.lang.Integer

BIGINT

java.lang.Long

FLOAT

java.lang.Float

DOUBLE

java.lang.Double

STRING/VARCHAR

java.lang.String

开发并使用UDF

您需要创建Maven项目并使用Java语言编写相应功能。

步骤一:创建Maven项目

创建Maven项目,项目的基本目录结构如下。

project
|--pom.xml
|--src
|  |--main
|  |  |--java
|  |  |--resources
|  |--test
|--target

步骤二:添加依赖

在pom.xml中添加如下依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>udf</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.10</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

步骤三:开发UDF

您需要使用Java语言开发相应UDF。

开发Scalar UDF

Scalar UDF,即用户自定义标量函数,可以对单行数据进行操作,输出单行结果。当您在查询时使用Scalar UDF,每行数据最终都会按行出现在结果集中。典型的标量函数包括UPPERLOWERROUNDABS

以下示例以提取JSON数据功能为例进行说明。例如,业务场景中,JSON数据中某个字段的值可能是JSON字符串而不是JSON对象,因此在提取JSON字符串时,SQL语句需要嵌套调用GET_JSON_STRING,即GET_JSON_STRING(GET_JSON_STRING('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key"), "$.k0")

为简化SQL语句,您可以开发一个UDF,直接提取JSON字符串,例如:MY_UDF_JSON_GET('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key.k0")

package com.starrocks.udf.sample;
import com.alibaba.fastjson.JSONPath;

public class UDFJsonGet {
    public final String evaluate(String jsonObj, String key) {
        if (obj == null || key == null) return null;
        try {
            // JSONPath库可以全部展开,即使某个字段的值是JSON格式的字符串
            return JSONPath.read(jsonObj, key).toString();
        } catch (Exception e) {
            return null;
        }
    }
}

用户自定义类必须实现如下方法。

说明

方法中请求参数和返回参数的数据类型,需要和步骤六中的CREATE FUNCTION语句中声明的相同,且两者的类型映射关系需要符合类型映射关系

方法

含义

TYPE1 evaluate(TYPE2, ...)

evaluate方法为UDF调用入口,必须是public成员方法。

开发UDAF

UDAF,即用户自定义的聚合函数,对多行数据进行操作,输出单行结果。典型的聚合函数包括SUMCOUNTMAXMIN,这些函数对于每个GROUP BY分组中多行数据进行聚合后,只输出一行结果。

以下示例以MY_SUM_INT函数为例进行说明。与内置函数SUM(返回值为BIGINT类型)区别在于,MY_SUM_INT函数支持传入参数和返回参数的类型为INT。

package com.starrocks.udf.sample;

public class SumInt {
    public static class State {
        int counter = 0;
        public int serializeLength() { return 4; }
    }

    public State create() {
        return new State();
    }

    public void destroy(State state) {
    }

    public final void update(State state, Integer val) {
        if (val != null) {
            state.counter+= val;
        }
    }

    public void serialize(State state, java.nio.ByteBuffer buff) {
        buff.putInt(state.counter);
    }

    public void merge(State state, java.nio.ByteBuffer buffer) {
        int val = buffer.getInt();
        state.counter += val;
    }

    public Integer finalize(State state) {
        return state.counter;
    }
}

用户自定义类必须实现如下方法。

说明

方法中传入参数和返回参数的数据类型,需要和步骤六中的CREATE FUNCTION语句中声明的相同,且两者的类型映射关系需要符合类型映射关系

方法

含义

State create()

创建State。

void destroy(State)

销毁State。

void update(State, ...)

更新State。其中第一个参数是State,其余的参数是函数声明的输入参数,可以为1个或多个。

void serialize(State, ByteBuffer)

序列化State。

void merge(State, ByteBuffer)

合并State和反序列化State。

TYPE finalize(State)

通过State获取函数的最终结果。

并且,开发UDAF函数时,您需要使用缓冲区类java.nio.ByteBuffer和局部变量serializeLength,用于保存和表示中间结果,指定中间结果的序列化长度。

类和局部变量

说明

java.nio.ByteBuffer()

缓冲区类,用于保存中间结果。由于中间结果在不同执行节点间传输时,会进行序列化和反序列化,因此还需要使用serializeLength指定中间结果序列化后的长度。

serializeLength()

中间结果序列化后的长度,单位为Byte。serializeLength的数据类型固定为INT。例如,示例中State { int counter = 0; public int serializeLength() { return 4; }}包含对中间结果序列化后的说明,即,中间结果的数据类型为INT,序列化长度为4 Byte。您也可以按照业务需求进行调整,例如中间结果序列化后的数据类型LONG,序列化长度为8 Byte,则需要传入State { long counter = 0; public int serializeLength() { return 8; }}

说明

java.nio.ByteBuffer序列化相关事项:

  • 不支持依赖ByteBuffer的remaining()方法来反序列化State。

  • 不支持对ByteBuffer调用clear()方法。

  • serializeLength需要与实际写入数据的长度保持一致,否则序列化和反序列化过程中会造成结果错误。

开发UDWF

UDWF,即用户自定义窗口函数。跟普通聚合函数不同的是,窗口函数针对一组行(一个窗口)计算值,并为每行返回一个结果。一般情况下,窗口函数包含OVER子句,将数据行拆分成多个分组,窗口函数基于每一行数据所在的组(一个窗口)进行计算,并为每行返回一个结果。

以下示例以MY_WINDOW_SUM_INT函数为例进行说明。与内置函数SUM(返回类型为BIGINT)区别在于,MY_WINDOW_SUM_INT函数支持传入参数和返回参数的类型为INT。

package com.starrocks.udf.sample;

public class WindowSumInt {    
    public static class State {
        int counter = 0;
        public int serializeLength() { return 4; }
        @Override
        public String toString() {
            return "State{" +
                    "counter=" + counter +
                    '}';
        }
    }

    public State create() {
        return new State();
    }

    public void destroy(State state) {

    }

    public void update(State state, Integer val) {
        if (val != null) {
            state.counter+=val;
        }
    }

    public void serialize(State state, java.nio.ByteBuffer buff) {
        buff.putInt(state.counter);
    }

    public void merge(State state, java.nio.ByteBuffer buffer) {
        int val = buffer.getInt();
        state.counter += val;
    }

    public Integer finalize(State state) {
        return state.counter;
    }

    public void reset(State state) {
        state.counter = 0;
    }

    public void windowUpdate(State state,
                            int peer_group_start, int peer_group_end,
                            int frame_start, int frame_end,
                            Integer[] inputs) {
        for (int i = (int)frame_start; i < (int)frame_end; ++i) {
            state.counter += inputs[i];
        }
    }
}

用户自定义类必须实现UDAF所需要的方法(窗口函数是特殊聚合函数)以及windowUpdate()方法。

说明

方法中请求参数和返回参数的数据类型,需要和步骤六中的CREATE FUNCTION语句中声明的相同,且两者的类型映射关系需要符合类型映射关系

需要额外实现的方法

方法

含义

void windowUpdate(State state, int, int, int , int, ...)

更新窗口数据。窗口函数的详细说明,请参见窗口函数。输入每一行数据,都会获取到对应窗口信息来更新中间结果。

  • peer_group_start:是当前分区开始的位置。

    分区:OVER子句中PARTITION BY指定分区列,分区列的值相同的行被视为在同一个分区内。

  • peer_group_end:当前分区结束的位置。

  • frame_start:当前窗口框架(window frame)起始位置。

    窗口框架:window frame子句指定了运算范围,以当前行为准,前后若干行作为窗口函数运算的对象。例如ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING,表示运算范围为当前行和它前后各一行数据。

  • frame_end:当前窗口框架(window frame)结束位置。

  • inputs:表示一个窗口中输入的数据,为包装类数组。包装类需要对应输入数据的类型,本示例中输入数据类型为INT,因此包装类数组为Integer[]。

开发UDTF

UDTF,即用户自定义表值函数,读入一行数据,输出多个值可视为一张表。表值函数常用于实现行转列。

说明

目前UDTF只支持返回多行单列。

以下示例以MY_UDF_SPLIT函数为例进行说明。MY_UDF_SPLIT函数支持分隔符为空格,传入参数和返回参数的类型为STRING。

package com.starrocks.udf.sample;

public class UDFSplit{
    public String[] process(String in) {
        if (in == null) return null;
        return in.split(" ");
    }
}

用户自定义类必须实现如下方法。

说明

方法中请求参数和返回参数的数据类型,需要和步骤六中的CREATE FUNCTION语句中声明的相同,且两者的类型映射关系需要符合类型映射关系

方法

含义

TYPE[] process()

process()方法为UDTF调用入口,需要返回数组。

步骤四:打包Java项目

通过以下命令打包Java项目。

mvn package

target目录下会生成两个文件:udf-1.0-SNAPSHOT.jarudf-1.0-SNAPSHOT-jar-with-dependencies.jar

步骤五:上传项目

将文件udf-1.0-SNAPSHOT-jar-with-dependencies.jar上传到OSS上,并开放JAR包的公共读权限。详情请参见简单上传设置Bucket ACL

说明

步骤六中,FE会对UDF所在JAR包进行校验并计算校验值,BE会下载UDF所在JAR包并执行。

步骤六:在StarRocks中创建UDF

StarRocks内提供了两种Namespace的UDF:一种是Database级Namespace,一种是Global级Namespace。

  • 如果您没有特殊的UDF可见性隔离需求,您可以直接选择创建Global UDF。在引用Global UDF时,直接调用Function Name即可,无需任何Catalog和Database作为前缀,访问更加便捷。

  • 如果您有特殊的UDF可见性隔离需求,或者需要在不同Database下创建同名UDF,那么你可以选择在Database内创建UDF。此时,如果您的会话在某个Database内,您可以直接调用Function Name即可;如果您的会话在其他Catalog和Database下,那么您需要带上Catalog和Database前缀,例如:catalog.database.function

说明

创建Global UDF需要有System级的CREATE GLOBAL FUNCTION权限;创建数据库级别的UDF需要有数据库级的CREATE FUNCTION权限;使用UDF需要有对应UDF的USAGE权限。关于如何赋权,参见GRANT

JAR包上传完成后,您需要在StarRocks中,按需创建相应的UDF。如果创建Global UDF,只需要在SQL语句中带上GLOBAL关键字即可。

语法

CREATE [GLOBAL][AGGREGATE | TABLE] FUNCTION function_name(arg_type [, ...])
RETURNS return_type
[PROPERTIES ("key" = "value" [, ...]) ]

参数说明

参数

必选

说明

GLOBAL

如需创建全局UDF,需指定该关键字。从3.0版本开始支持。

AGGREGATE

如要创建UDAF和UDWF,需指定该关键字。

TABLE

如要创建UDTF,需指定该关键字。

function_name

函数名,可以包含数据库名称,比如,db1.my_func。如果function_name中包含了数据库名称,那么该UDF会创建在对应的数据库中,否则该UDF会创建在当前数据库。新函数名和参数不能与目标数据库中已有的函数相同,否则会创建失败;如只有函数名相同,参数不同,则可以创建成功。

arg_type

函数的参数类型。具体支持的数据类型,请参见类型映射关系

return_type

函数的返回值类型。具体支持的数据类型,请参见类型映射关系

properties

函数相关属性。创建不同类型的UDF需配置不同的属性,详情和示例请参考以下示例。

创建Scalar UDF

执行如下命令,在StarRocks中创建之前示例中的Scalar UDF。

CREATE [GLOBAL] FUNCTION MY_UDF_JSON_GET(string, string) 
RETURNS string
PROPERTIES (
    "symbol" = "com.starrocks.udf.sample.UDFJsonGet", 
    "type" = "StarrocksJar",
    "file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

参数

描述

symbol

UDF所在项目的类名。格式为<package_name>.<class_name>

type

用于标记所创建的UDF类型。取值为StarrocksJar,表示基于Java的UDF。

file

UDF所在JAR包的HTTP路径,配置成OSS包含对应内网Endpoint的HTTP URL。格式为http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/<jar_package_name>

创建UDAF

执行如下命令,在StarRocks中创建之前示例中的UDAF。

CREATE [GLOBAL] AGGREGATE FUNCTION MY_SUM_INT(INT) 
RETURNS INT
PROPERTIES 
( 
    "symbol" = "com.starrocks.udf.sample.SumInt", 
    "type" = "StarrocksJar",
    "file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

PROPERTIES里的参数说明与创建Scalar UDF相同。

创建UDWF

执行如下命令,在StarRocks中创建先前示例中的UDWF。

CREATE [GLOBAL] AGGREGATE FUNCTION MY_WINDOW_SUM_INT(Int)
RETURNS Int
PROPERTIES 
(
    "analytic" = "true",
    "symbol" = "com.starrocks.udf.sample.WindowSumInt", 
    "type" = "StarrocksJar", 
    "file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

analytic:所创建的函数是否为窗口函数,固定取值为true。其他参数说明与创建Scalar UDF相同。

创建UDTF

执行如下命令,在StarRocks中创建先前示例中的UDTF。

CREATE [GLOBAL] TABLE FUNCTION MY_UDF_SPLIT(string)
RETURNS string
PROPERTIES 
(
    "symbol" = "com.starrocks.udf.sample.UDFSplit", 
    "type" = "StarrocksJar", 
    "file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

PROPERTIES里的参数说明与创建Scalar UDF相同。

步骤七:使用UDF

创建完成后,您可以测试使用您开发的UDF。

使用Scalar UDF

执行如下命令,使用步骤六创建的Scalar UDF函数。

SELECT MY_UDF_JSON_GET('{"key":"{\\"in\\":2}"}', '$.key.in');

使用UDAF

执行如下命令,使用步骤六创建的UDAF函数。

SELECT MY_SUM_INT(col1);

使用UDWF

执行如下命令,使用步骤六创建的UDWF函数。

SELECT MY_WINDOW_SUM_INT(intcol) 
            OVER (PARTITION BY intcol2
                  ORDER BY intcol3
                  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
FROM test_basic;

使用UDTF

执行如下命令,使用先前示例中的UDTF。

-- 假设存在表 t1,其列 a、b、c1 信息如下。
SELECT t1.a,t1.b,t1.c1 FROM t1;
> output:
1,2.1,"hello world"
2,2.2,"hello UDTF."

-- 使用 MY_UDF_SPLIT() 函数。
SELECT t1.a,t1.b, MY_UDF_SPLIT FROM t1, MY_UDF_SPLIT(t1.c1); 
> output:
1,2.1,"hello"
1,2.1,"world"
2,2.2,"hello"
2,2.2,"UDTF."
说明
  • 第一个MY_UDF_SPLIT为调用MY_UDF_SPLIT后生成的列别名。

  • 暂不支持使用AS t2(f1)的方式指定表格函数返回表的表别名和列别名。

查看UDF信息

运行以下命令查看UDF信息。

SHOW [GLOBAL] FUNCTIONS;

删除UDF

运行以下命令删除指定的UDF。

DROP [GLOBAL] FUNCTION <function_name>(arg_type [, ...]);

FAQ

Q:开发UDF时是否可以使用静态变量?不同UDF间的静态变量间否会互相影响?

A:支持在开发UDF时使用静态变量,且不同UDF间(即使类同名),静态变量是互相隔离的,不会互相影响。